package net.bw.realtime.jtp.dwd.trade.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.bw.realtime.jtp.common.utils.HBaseUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.util.Map;

/*
 * @ Author：liuyawei
 * @ Date：2025-06-04
 */
public class LoadProvinceDimMapFunction extends RichMapFunction<String, String> {

    // 定义Map集合，key为关联字段
    private Map<String, String> cacheMap;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 获取HBase表中的数据，存储在Map集合中
        cacheMap = HBaseUtil.scanData("dim_base_province", "info", "name");
    }

    @Override
    public String map(String value) throws Exception {

        // 1.解析JSON
        JSONObject object = JSON.parseObject(value);

        // 2.获取省份ID
        String provinceId = object.getString("province_id");

        // 3.获取Map中的省份名称
        String provinceName = cacheMap.getOrDefault(provinceId, "未知");

        // 4.将省份名称添加到JSON中
        object.put("province_name", provinceName);

        // 5.返回结果
        return object.toJSONString();
    }

    // 关闭连接
    @Override
    public void close() throws Exception {
        super.close();
    }
}
